package custom;

import beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;

import java.util.Collections;
import java.util.List;

/**
 * 定义一个有状态的map操作，统计当前分区数据个数
 *
 * @author lvbingbing
 * @date 2022-01-05 00:42
 */
public class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer> {

    private static final long serialVersionUID = 7582426773570747333L;

    /**
     * 定义一个本地变量，作为算子状态
     */
    private Integer count = 0;

    @Override
    public Integer map(SensorReading sensorReading) throws Exception {
        count++;
        return count;
    }

    @Override
    public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
        return Collections.singletonList(count);
    }

    @Override
    public void restoreState(List<Integer> state) throws Exception {
        // 状态恢复
        count = state.stream()
                .reduce(Integer::sum)
                .orElse(0);
    }
}
